深入浅出 Java 多线程
点击上方 Java后端,选择 设为星标
优质文章,及时送达
作者:AAAhxz001
链接:blog.csdn.net/weixin_44104367/article/details/104481510
线程
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
1.1 线程与进程的区别
进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;进程——资源分配的最小单位。
线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个单元执行流。线程——程序执行的最小单位。
也就是,进程可以包含多个线程,而线程是程序执行的最小单位。
我把 Java 相关的文章整理成了 PDF ,关注微信公众号 Java后端 回复 666 下载。
1.2 线程的状态
NEW:线程刚创建 RUNNABLE: 在JVM中正在运行的线程,其中运行状态可以有运行中RUNNING和READY两种状态,由系统调度进行状态改变。 BLOCKED: 线程处于阻塞状态,等待监视锁,可以重新进行同步代码块中执行 WAITING : 等待状态 TIMED_WAITING: 调用sleep() join() wait()方法可能导致线程处于等待状态 TERMINATED: 线程执行完毕,已经退出
1.3 Notify 和 wait :
首先看源码给出的解释,这里翻译了一下:
唤醒一个正在等待这个对象的线程监控。如果有任何线程正在等待这个对象,那么它们中的一个被选择被唤醒。选择是任意的,发生在执行的酌情权。一个线程等待一个对象通过调用一个{@code wait}方法进行监视。
导致当前线程等待,直到另一个线程调用
{@link java.lang.Object#notify()}方法或
{@link java.lang.Object#notifyAll()}方法。
换句话说,这个方法的行为就像它简单一样
执行调用{@code wait(0)}。
当前线程必须拥有该对象的监视器。线程
释放此监视器的所有权,并等待另一个线程
通知等待该对象的监视器的线程唤醒
通过调用{@code notify}方法或
{@code notifyAll}方法。然后线程等待,直到它可以重新取得监视器的所有权,然后继续执行。
它们最大本质的区别是:sleep()不释放同步锁,wait()释放同步锁.
还有用法的上的不同是:sleep(milliseconds)可以用时间指定来使他自动醒过来,如果时间不到你只能调用interreput()来强行打断;wait()可以用notify()直接唤起.
这两个方法来自不同的类分别是Thread和Object 最主要是sleep方法没有释放锁,而wait方法释放了锁,使得其他线程可以使用同步控制块或者方法。
1.4 Thread.sleep() 和Thread.yield()的异同
1. 相同 : sleep()和yield()都会释放CPU。
2. 不同: sleep()使当前线程进入停滞状态,所以执行sleep()的线程在指定的时间内肯定不会执行;yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行。
1.5 补充:死锁的概念
互斥条件:顾名思义,线程对资源的访问是排他性,当该线程释放资源后下一线程才可进行占用 请求和保持:简单来说就是自己拿的不放手又等待新的资源到手。
线程T1至少已经保持了一个资源R1占用,但又提出对另一个资源R2请求,而此时,资源R2被其他线程T2占用,于是该线程T1也必须等待,但又对自己保持的资源R1不释放。不可剥夺:在没有使用完资源时,其他线性不能进行剥夺 循环等待:一直等待对方线程释放资源
1.6 补充:并发和并行的区别
并行:是指同一时刻同时处理多任务的能力。当有多个线程在操作时,cpu同时处理这些线程请求的能力。
区别就在于CPU是否能同时处理所有任务,并发不能,并行能
1.7 补充:线程安全三要素
1.8 补充:如何实现线程安全
互斥同步:synchronized、lock 非阻塞同步:CAS 无需同步的方案:如果一个方法本来就不涉及共享数据,那它自然就无需任何同步操作去保证正确性
1.9 补充:保证线程安全的机制:
synchronized关键字 lock CAS、原子变量 ThreadLocl:简单来说就是让每个线程,对同一个变量,都有自己的独有副本,每个线程实际访问的对象都是自己的,自然也就不存在线程安全问题了。 volatile CopyOnWrite写时复制
多线程
2 创建线程的方法
继承Thread类
public class ThreadCreateTest {
public static void main(String[] args) {
new MyThread().start();
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
}
}
实现Runable接口
public class RunableCreateTest {
public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();
new Thread(runnable).start();
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
}
}
通过Callable和Future创建线程
public class CallableCreateTest {
public static void main(String[] args) throws Exception {
MyCallable callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
Integer sum = futureTask.get();
System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId() + "=" + sum);
}
}
class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId() + "\t" + new Date() + " \tstarting...");
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().getId() + "\t" + new Date() + " \tover...");
return sum;
}
}
线程池方式创建
实际开发中,阿里巴巴开发插件一直提倡使用线程池创建线程,原因在下方会解释,所以上面的代码我就只简写了一些demo
2.1 线程池创建线程
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗(主要)。 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性。
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public class ThreadPoolExecutor extends AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService
2.2 ThreadPoolExecutor介绍
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize: 线程池的核心线程数,即便线程池里没有任何任务,也会有corePoolSize个线程在候着等任务。 maximumPoolSize: 最大线程数,不管提交多少任务,线程池里最多工作线程数就是maximumPoolSize。 keepAliveTime: 线程的存活时间。当线程池里的线程数大于corePoolSize时,如果等了keepAliveTime时长还没有任务可执行,则线程退出。 unit: 这个用来指定keepAliveTime的单位,比如秒:TimeUnit.SECONDS。 BlockingQueue: 一个阻塞队列,提交的任务将会被放到这个队列里。 threadFactory: 线程工厂,用来创建线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。 handler: 拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。
2.2.1BlockingQueue
2.2.2 ArrayBlockingQueue
package map;
import java.util.concurrent.*;
public class MyTestMap {
private static final int maxSize = 5;
public static void main(String[] args){
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(maxSize);
new Thread(new Productor(queue)).start();
new Thread(new Customer(queue)).start();
}
}
class Customer implements Runnable {
private BlockingQueue<Integer> queue;
Customer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
this.cusume();
}
private void cusume() {
while (true) {
try {
int count = (int) queue.take();
System.out.println("customer正在消费第" + count + "个商品===");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Productor implements Runnable {
private BlockingQueue<Integer> queue;
private int count = 1;
Productor(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
this.product();
}
private void product() {
while (true) {
try {
queue.put(count);
System.out.println("生产者正在生产第" + count + "个商品");
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2.2.3 LinkedBlockingQueue
2.2.4 LinkedBlockingQueue和ArrayBlockingQueue的主要区别
ArrayBlockingQueue的初始化必须传入队列大小,LinkedBlockingQueue则可以不传入 ArrayBlockingQueue用一把锁控制并发,LinkedBlockingQueue俩把锁控制并发,锁的细粒度更细。即前者生产者消费者进出都是一把锁,后者生产者生产进入是一把锁,消费者消费是另一把锁。 ArrayBlockingQueue采用数组的方式存取,LinkedBlockingQueue用Node链表方式存取
2.2.5 handler拒绝策略
AbortPolicy:不处理,直接抛出异常。 CallerRunsPolicy:只用调用者所在线程来运行任务,即提交任务的线程。 DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。 DiscardPolicy:不处理,丢弃掉,不抛出异常。
2.2.6 线程池五种状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING:在这个状态的线程池能判断接受新提交的任务,并且也能处理阻塞队列中的任务 SHUTDOWN:处于关闭的状态,该线程池不能接受新提交的任务,但是可以处理阻塞队列中已经保存的任务,在线程处于RUNNING状态,调用shutdown()方法能切换为该状态。 STOP:线程池处于该状态时既不能接受新的任务也不能处理阻塞队列中的任务,并且能中断现在线程中的任务。当线程处于RUNNING和SHUTDOWN状态,调用shutdownNow()方法就可以使线程变为该状态 TIDYING:在SHUTDOWN状态下阻塞队列为空,且线程中的工作线程数量为0就会进入该状态,当在STOP状态下时,只要线程中的工作线程数量为0就会进入该状态。 TERMINATED:在TIDYING状态下调用terminated()方法就会进入该状态。可以认为该状态是最终的终止状态。
判断核心线程是否已满,是进入队列,否:创建线程 判断等待队列是否已满,是:查看线程池是否已满,否:进入等待队列 查看线程池是否已满,是:拒绝,否创建线程
2.3 深入理解ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
创建Worker对象,同时也会实例化一个Thread对象。在创建Worker时会调用threadFactory来创建一个线程。 启动启动这个线程
2.3.1线程池中ctl属性的作用是什么?
ctl属性包含两个概念:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState:即rs 表明当前线程池的状态,是否处于Running,Shutdown,Stop,Tidying,
workerCount:即wc 表明当前有效的线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
1110 0000 0000 0000 0000 0000 0000 0000
|||
31~29位
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
workerCountOf
这个函数传入ctl之后,是通过ctl&CAPACITY操作来获取当前运行线程总数的。也就是RunningState|WorkCount&CAPACITY,算出来的就是低28位的值。因为CAPACITY得到的就是高3位(29-31位)位0,低28位(0-28位)都是1,所以得到的就是ctl中低28位的值。runStateOf
这个方法的话,算的就是RunningState|WorkCount&CAPACITY,高3位的值,因为CAPACITY是CAPACITY的取反,所以得到的就是高3位(29-31位)为1,低28位(0-28位)为0,所以通过&运算后,所得到的值就是高3为的值。总之这是个
2.3.2 shutdownNow和shutdown的区别
shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP
2.3.3 线程复用原理
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
默认的是不允许
2.4 CountDownLatch和CyclicBarrier区别:
countDownLatch是一个计数器,线程完成一个记录一个,计数器递减,只能只用一次 CyclicBarrier的计数器更像一个阀门,需要所有线程都到达,然后继续执行,计数器递增,提供reset功能,可以多次使用
3. 多线程间通信的几种方式
题目:有两个线程A、B,A线程向一个集合里面依次添加元素"abc"字符串,一共添加十次,当添加到第五次的时候,希望B线程能够收到A线程的通知,然后B线程执行相关的业务操作。
3.1 使用 volatile 关键字
package thread;
public class MyThreadTest {
public static void main(String[] args) throws Exception {
notifyThreadWithVolatile();
}
private static volatile boolean flag = false;
private static void notifyThreadWithVolatile() throws Exception {
Thread thc = new Thread("线程A"){
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
flag = true;
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
break;
}
System.out.println(Thread.currentThread().getName() + "====" + i);
}
}
};
Thread thd = new Thread("线程B") {
@Override
public void run() {
while (true) {
while (flag) {
System.out.println(Thread.currentThread().getName() + "收到通知");
System.out.println("do something");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ;
}
}
}
};
thd.start();
Thread.sleep(1000L);
thc.start();
}
}
3.2 锁机制
如果看到这里,说明你喜欢这篇文章,请 转发、点赞。同时 标星(置顶)本公众号可以第一时间接受到博文推送。
推荐阅读
1. MySQL 大表优化方案
2. Spring Boot+RabbitMQ 实现延迟消息
获取方式:点“ 在看,关注公众号 Java后端 并回复 777 领取,更多内容陆续奉上。
喜欢文章,点个在看